package cn.itcast.szse;

import cn.itcast.util.KafkaUtil;

import java.io.DataInputStream;
import java.io.InputStream;
import java.net.Socket;

/**
 * 深市接收服务端数据，发送kafka
 */
public class SzseSocketClient {
    /**
     * 开发步骤:
     * 1.创建main方法
     * 2.建立socket连接，获取流数据
     * 3.解析行数据，数据转换
     * 4.发送kafka
     */
    public static void main(String[] args) throws Exception {

        // 2.建立socket连接，获取流数据
        Socket socket = new Socket("localhost", 4444);
        InputStream ins = socket.getInputStream();
        DataInputStream dataInputStream = new DataInputStream(ins);
        //创建kafka生产者对象
        KafkaUtil kafkaPro = new KafkaUtil();
        while (true) {

            //实时读取数据
            String str = dataInputStream.readUTF();
            //3.解析行数据，数据转换

            //4.发送kafka
            kafkaPro.sendData("szse",str);

        }
    }

}
